feat(partitions): add consensus per partition and extra#3071
feat(partitions): add consensus per partition and extra#3071
Conversation
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.42%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #3071 +/- ##
============================================
- Coverage 70.52% 70.23% -0.29%
Complexity 943 943
============================================
Files 1115 1115
Lines 95388 95274 -114
Branches 72589 72490 -99
============================================
- Hits 67275 66920 -355
- Misses 25634 25859 +225
- Partials 2479 2495 +16
🚀 New features to boost your workflow:
|
| offset: u64, | ||
| ) -> Result<(), IggyError> { | ||
| let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, offset); | ||
| self.persist_consumer_offset_commit(pending).await?; |
There was a problem hiding this comment.
consumer offsets are persisted to disk here during the prepare phase (before quorum), unlike SendMessages which only writes to the in-memory journal during prepare and persists to disk during commit (commit_messages).
if the prepare never commits (leader crash, view change), disk has uncommitted values with no rollback mechanism. on restart, the uncommitted offset file is loaded as if it were committed.
recommended fix: move the persist_consumer_offset_commit call from persist_and_stage_* (prepare phase) to commit_consumer_offset_entry (commit phase). during prepare, only stage in-memory. this eliminates the rollback problem entirely and aligns with the SendMessages pattern.
There was a problem hiding this comment.
I think the behavior of treating written offset as committed by default is fine, as in scenario where we restart a replica after crash we perform state transfer, which would overwrite the currently commited offset for that particular consumer.
| let last_offset = | ||
| last_matching_offset.expect("non-empty poll result must have a last offset"); | ||
| if let Err(err) = self.store_consumer_offset(consumer, last_offset) { | ||
| if let Err(err) = self |
There was a problem hiding this comment.
poll_messages with auto_commit=true calls store_consumer_offset_and_persist which persists and applies directly without going through consensus replication. this creates three divergent write paths for consumer offsets:
- replicated via
StoreConsumerOffsetoperation (new in this PR) - full prepare/commit cycle - local-only persist + apply via auto-commit here - no replication
- in-memory-only via
Partition::store_consumer_offsettrait method (line 473) - no persistence, no replication
on failover, the new leader has no record of auto-committed offsets since they were never replicated. this is a pre-existing pattern, but now inconsistent since explicit StoreConsumerOffset IS replicated. worth documenting whether this is intentional (local optimization for consumer progress) or should be migrated to the consensus path.
There was a problem hiding this comment.
I think for auto commit polls, at the end of the command_handler construct new Message<Prepare>, with StoreConsumerOffset command and run it through the on_message handler, the same way an StoreConsumerOffset client request would.
Additionally I think we should expand the StoreConsumerOffset command to allow for different levels of consistency such as no_ack or quorum, something that kafka does for SendMessages. This way the auto_commit from PollMessages command, could inherit that aswell.
Implement
StoreConsumerOffsetandDeleteConsumerOffsetas replicated operations on partition. Additionally movepartitionsto an consensus per partition model, rather than relying on one instance of consensus for all of the partitions.